package com.lambkit.plugin.redis.mq;

import cn.hutool.db.nosql.redis.RedisDS;
import com.lambkit.core.Lambkit;
import com.lambkit.core.mq.MqType;
import com.lambkit.core.mq.Sender;
import com.lambkit.plugin.redis.RedisCache;
import com.lambkit.plugin.redis.serializer.ProtostuffSerializer;
import redis.clients.jedis.StreamEntryID;

import java.io.IOException;
import java.util.Collections;

/**
 * @author yangyong(孤竹行)
 */
public class RedisStreamSender implements Sender {
    private String streamName = "lambkitStream";

    private RedisCache redis;

    public RedisCache redis() {
        if(redis==null) {
            RedisDS reidsDS = Lambkit.context().getAttr("redisDS");
            if(reidsDS != null) {
                redis = new RedisCache(reidsDS, new ProtostuffSerializer());
            }
        }
        return redis;
    }

    @Override
    public void publish(Object message, boolean bSync) throws IOException, InterruptedException {
        StreamEntryID messageID = redis().getJedis().xadd(streamName, StreamEntryID.NEW_ENTRY, Collections.singletonMap("message", message.toString()));
    }

    @Override
    public String getName() {
        return streamName;
    }

    @Override
    public MqType getType() {
        return MqType.Queue;
    }

    @Override
    public void close() throws IOException {

    }
}
